package com.xxljob.demo.core.thread;

import com.xxljob.demo.core.conf.XxlJobAdminConfig;
import com.xxljob.demo.core.trigger.TriggerTypeEnum;
import com.xxljob.demo.core.trigger.XxlJobTrigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Author：YH
 * @Description：触发器
 * @Date:Created in 2022/5/19 22:43
 */
public class JobTriggerPoolHelper {

    private static Logger LOGGER = LoggerFactory.getLogger(JobTriggerPoolHelper.class);

    // fast/slow thread pool
    private ThreadPoolExecutor fastTriggerPool = null;

    private ThreadPoolExecutor slowTriggerPool = null;

    private volatile ConcurrentHashMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();

    private volatile long minTim = System.currentTimeMillis() / 60000;

    public void start() {
        fastTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
                    }
                });

        slowTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                    }
                });
    }


    // ---------------------- helper ----------------------

    private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();

    public static void toStart() {
        helper.start();
    }

    /**
     * @param jobId:任务ID
     * @param triggerType:触发枚举
     * @param failRetryCount:
     * @param executorShardingParam:
     * @param executorParam:
     * @param addressList:
     * @Description:
     * @return: void
     **/
    public static void trigger(int jobId,
                               TriggerTypeEnum triggerType,
                               int failRetryCount,
                               String executorShardingParam,
                               String executorParam,
                               String addressList) {
        helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
    }


    /**
     * @param jobId:
     * @param triggerType:
     * @param failRetryCount:
     * @param executorShardingParam:
     * @param executorParam:
     * @param addressList:
     * @Description:
     * @return: void
     **/
    private void addTrigger(final int jobId,
                            final TriggerTypeEnum triggerType,
                            final int failRetryCount,
                            final String executorShardingParam,
                            final String executorParam,
                            final String addressList) {
        // choose thread pool
        ThreadPoolExecutor triggerPool = fastTriggerPool;
        AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
        // job-timeout 10 times in 1 min
        if (jobTimeoutCount != null && jobTimeoutCount.get() > 10) {
            triggerPool = slowTriggerPool;
        }

        triggerPool.execute(new Runnable() {
            @Override
            public void run() {
                long start = System.currentTimeMillis();
                // do trigger
                try {
                    XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam,
                            addressList);
                } catch (Exception e) {
                    LOGGER.error(e.getMessage(), e);
                } finally {
                    // check timeout-count-map
                    long minTim_now = System.currentTimeMillis() / 60000;
                    if (minTim != minTim_now) {
                        minTim = minTim_now;
                        jobTimeoutCountMap.clear();
                    }

                    // incr timeout-count-map
                    long cost = System.currentTimeMillis() - start;
                    if (cost > 500) {       // ob-timeout threshold 500ms
                        AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                        if (timeoutCount != null) {
                            timeoutCount.incrementAndGet();
                        }
                    }
                }
            }
        });
    }
}
